package net.bwie.realtime.jtp.dim.Function;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichFilterFunction;

import java.util.*;

public class DimDataFilterFunction extends RichFilterFunction<String> {


    private final Set<String> dimSet;


    private final Set<String> operateTypeSet = new HashSet<String>(){
         {
            add("insert");
            add("update");
            add("delete");
        }
    };

     public DimDataFilterFunction(Set<String> dimSet) {
        this.dimSet = dimSet;
    }


    @Override
    public boolean filter(String value) throws Exception {

        JSONObject jsonObject = JSON.parseObject(value);
         //s1 : 获取表名称
        String tableName = jsonObject.getString("table_name");
        //s2 :  操作类型
        String operateType = jsonObject.getString("operate_type");
        //s3 :  判断表过滤

         return dimSet.contains(tableName) && operateTypeSet.contains(operateType);
    }
}
